001 /*
002 * Copyright 2005 Stephen J. McConnell.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
013 * implied.
014 *
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019 package net.dpml.station.server;
020
021 import java.rmi.AlreadyBoundException;
022 import java.rmi.RemoteException;
023 import java.rmi.server.UnicastRemoteObject;
024 import java.rmi.registry.LocateRegistry;
025 import java.rmi.registry.Registry;
026 import java.net.URL;
027 import java.util.Map;
028 import java.util.Hashtable;
029 import java.util.LinkedList;
030 import java.util.List;
031 import java.util.EventObject;
032
033 import net.dpml.station.Application;
034 import net.dpml.station.Callback;
035 import net.dpml.station.Manager;
036 import net.dpml.station.Station;
037 import net.dpml.station.StationException;
038
039 import net.dpml.station.info.ApplicationDescriptor;
040 import net.dpml.station.info.StartupPolicy;
041 import net.dpml.station.ApplicationRegistry;
042
043 import net.dpml.util.Logger;
044 import net.dpml.transit.model.TransitModel;
045 import net.dpml.transit.Disposable;
046
047 import net.dpml.lang.UnknownKeyException;
048
049 /**
050 * The RemoteStation is responsible for the establishment of
051 * callback monitors to external processes established by the
052 * station manager.
053 * @author <a href="http://www.dpml.net">Digital Product Meta Library</a>
054 * @version 1.0.0
055 */
056 public class RemoteStation extends UnicastRemoteObject implements Station, Manager
057 {
058 private final RemoteApplicationRegistry m_registry;
059 private final Map m_applications = new Hashtable();
060 private final Logger m_logger;
061 private final int m_port;
062 private final Registry m_rmiRegistry;
063 private final URL m_store;
064 private final TransitModel m_model;
065 private final LoggingServer m_server;
066 private final Thread m_thread;
067
068 private boolean m_terminated = false;
069
070 /**
071 * Creation of a station instance.
072 *
073 * @param logger the assigned logging channel
074 * @param model the transit model
075 * @param port the station port
076 * @param registryStorageUrl uri defining the registry backing store
077 * @exception Exception if a exception occurs during establishment
078 */
079 public RemoteStation(
080 Logger logger, TransitModel model, int port, URL registryStorageUrl )
081 throws Exception
082 {
083 super();
084
085 m_logger = logger;
086 m_port = port;
087 m_store = registryStorageUrl;
088 m_model = model;
089
090 m_rmiRegistry = getLocalRegistry( port );
091
092 try
093 {
094 m_rmiRegistry.bind( STATION_KEY, this );
095 }
096 catch( AlreadyBoundException e )
097 {
098 final String error =
099 "An instance of the Station is already bound to port " + port;
100 throw new StationException( error, e );
101 }
102
103 setShutdownHook( this );
104 startEventDispatchThread();
105
106 try
107 {
108 m_server = new LoggingServer( 2020 );
109 m_thread = new Thread( m_server, "DPML Station Logging Server" );
110 m_thread.start();
111 }
112 catch( Exception e )
113 {
114 final String error =
115 "Unexpected error while attempting to start the logging server on port " + 2020;
116 throw new StationException( error, e );
117 }
118
119 if( getLogger().isDebugEnabled() )
120 {
121 if( null == registryStorageUrl )
122 {
123 getLogger().debug( "loading registry from default storage" );
124 }
125 else
126 {
127 getLogger().debug( "loading registry from [" + registryStorageUrl + "]" );
128 }
129 }
130
131 m_registry = new RemoteApplicationRegistry( logger, registryStorageUrl );
132 String[] keys = m_registry.getKeys();
133
134 if( getLogger().isDebugEnabled() )
135 {
136 getLogger().debug( "registry established (" + keys.length + ")" );
137 }
138
139 for( int i=0; i<keys.length; i++ )
140 {
141 String key = keys[i];
142 try
143 {
144 ApplicationDescriptor descriptor =
145 m_registry.getApplicationDescriptor( key );
146 if( StartupPolicy.AUTOMATIC.equals( descriptor.getStartupPolicy() ) )
147 {
148 RemoteApplication application = getRemoteApplication( key );
149 application.start();
150 }
151 }
152 catch( UnknownKeyException e )
153 {
154 throw new RuntimeException( e ); // will not happen
155 }
156 }
157 }
158
159 /**
160 * Return a string containing info about the general setup of the station.
161 * @return station configuration info
162 */
163 public String[] getInfo()
164 {
165 String[] values = new String[4];
166 values[0] = "Port: " + m_port;
167 values[1] = "Store: " + m_store;
168 values[2] = "Basedir: " + System.getProperty( "user.dir" );
169 values[3] = "Codebase: "
170 + getClass().getProtectionDomain().getCodeSource().getLocation();
171 return values;
172 }
173
174 /**
175 * Return an callback handler for the supplied id.
176 * @param id the callback id
177 * @return the callback handler
178 * @exception UnknownKeyException if the id is unknown
179 * @exception RemoteException if a remote error occurs
180 */
181 public Callback getCallback( String id ) throws UnknownKeyException, RemoteException
182 {
183 // TODO: improve this so that this is only called once per appliation
184 return getRemoteApplication( id );
185 }
186
187 /**
188 * Shutdown the station.
189 */
190 public void shutdown()
191 {
192 shutdown( true );
193 }
194
195 /**
196 * Shutdown the station.
197 * @param exit if true launch a process termination
198 */
199 private void shutdown( boolean exit )
200 {
201 synchronized( m_applications )
202 {
203 if( m_terminated )
204 {
205 return;
206 }
207 else
208 {
209 m_terminated = true;
210 }
211
212 if( getLogger().isInfoEnabled() )
213 {
214 getLogger().info( "initiating station shutdown" );
215 }
216
217 try
218 {
219 m_rmiRegistry.unbind( STATION_KEY );
220 }
221 catch( Exception e )
222 {
223 // ignore
224 }
225 try
226 {
227 RemoteApplication[] applications = getRemoteApplications();
228 for( int i=0; i<applications.length; i++ )
229 {
230 RemoteApplication application = applications[i];
231 application.shutdown();
232 UnicastRemoteObject.unexportObject( application, true );
233 }
234 UnicastRemoteObject.unexportObject( m_registry, true );
235 }
236 catch( Exception e )
237 {
238 // ignore
239 }
240 try
241 {
242 int n = m_server.getErrorCount();
243 if( n > 0 )
244 {
245 getLogger().warn( "logging issues: " + n );
246 }
247 m_thread.interrupt();
248 }
249 catch( Exception e )
250 {
251 // ignore
252 }
253
254 finally
255 {
256 if( getLogger().isInfoEnabled() )
257 {
258 getLogger().info( "station shutdown complete" );
259 }
260
261 if( exit )
262 {
263 if( m_model instanceof Disposable )
264 {
265 try
266 {
267 Disposable disposable = (Disposable) m_model;
268 disposable.dispose();
269 }
270 catch( Exception e )
271 {
272 // ignore
273 }
274 }
275
276 if( getLogger().isDebugEnabled() )
277 {
278 getLogger().debug( "terminating process" );
279 }
280
281 Thread thread = new Thread(
282 new Runnable()
283 {
284 public void run()
285 {
286 RemoteStation.m_DISPATCH.dispose();
287 System.exit( 0 );
288 }
289 }
290 );
291 thread.start();
292 }
293 }
294 }
295 }
296
297 /**
298 * Return the application registry.
299 * @return the registry
300 */
301 public ApplicationRegistry getApplicationRegistry()
302 {
303 return m_registry;
304 }
305
306 /**
307 * Return an application reference for the supplied key.
308 * @param key the application key
309 * @return the application
310 * @exception UnknownKeyException if the key is unknown
311 * @exception RemoteException if a remote error occurs
312 */
313 public Application getApplication( String key ) throws UnknownKeyException, RemoteException
314 {
315 return getRemoteApplication( key );
316 }
317
318 /**
319 * Return an application reference for the supplied key.
320 * @param key the application key
321 * @return the application
322 * @exception UnknownKeyException if the key is unknown
323 * @exception RemoteException if a remote error occurs
324 */
325 RemoteApplication getRemoteApplication( String key ) throws UnknownKeyException, RemoteException
326 {
327 synchronized( m_applications )
328 {
329 if( m_applications.containsKey( key ) )
330 {
331 return (RemoteApplication) m_applications.get( key );
332 }
333 else
334 {
335 Logger logger = getLogger().getChildLogger( key );
336 ApplicationDescriptor descriptor = m_registry.getApplicationDescriptor( key );
337 RemoteApplication application =
338 new RemoteApplication( logger, descriptor, key, m_port );
339 m_applications.put( key, application );
340 return application;
341 }
342 }
343 }
344
345 /**
346 * Return an array of all remote applications.
347 * @return the applications array
348 */
349 RemoteApplication[] getRemoteApplications()
350 {
351 synchronized( m_applications )
352 {
353 return (RemoteApplication[]) m_applications.values().toArray( new RemoteApplication[0] );
354 }
355 }
356
357 private Logger getLogger()
358 {
359 return m_logger;
360 }
361
362 private Registry getLocalRegistry( int port ) throws RemoteException
363 {
364 try
365 {
366 Registry registry = LocateRegistry.createRegistry( port );
367 getLogger().debug( "created local registry on port " + port );
368 return registry;
369 }
370 catch( RemoteException e )
371 {
372 Registry registry = LocateRegistry.getRegistry( port );
373 getLogger().debug( "using local registry on port " + port );
374 return registry;
375 }
376 }
377
378 /**
379 * Queue of pending notification events. When an event for which
380 * there are one or more listeners occurs, it is placed on this queue
381 * and the queue is notified. A background thread waits on this queue
382 * and delivers the events. This decouples event delivery from
383 * the application concern, greatly simplifying locking and reducing
384 * opportunity for deadlock.
385 */
386 private static final List EVENT_QUEUE = new LinkedList();
387
388 /**
389 * Enqueue an event for delivery to registered
390 * listeners unless there are no registered
391 * listeners.
392 * @param event the event to enqueue
393 */
394 static void enqueueEvent( EventObject event )
395 {
396 synchronized( EVENT_QUEUE )
397 {
398 EVENT_QUEUE.add( event );
399 EVENT_QUEUE.notify();
400 }
401 }
402
403 /**
404 * A single background thread ("the event notification thread") monitors
405 * the event queue and delivers events that are placed on the queue.
406 */
407 private static class EventDispatchThread extends Thread
408 {
409 private final Logger m_logger;
410
411 private boolean m_continue = true;
412
413 EventDispatchThread( Logger logger )
414 {
415 super( "DPML Station Event Dispatch" );
416 m_logger = logger;
417 m_logger.debug( "starting event dispatch thread" );
418 }
419
420 void dispose()
421 {
422 synchronized( EVENT_QUEUE )
423 {
424 m_logger.debug( "stopping event dispatch thread" );
425 m_continue = false;
426 EVENT_QUEUE.notify();
427 }
428 }
429
430 public void run()
431 {
432 while( m_continue )
433 {
434 // Wait on EVENT_QUEUE till an event is present
435 EventObject event = null;
436 synchronized( EVENT_QUEUE )
437 {
438 try
439 {
440 while( EVENT_QUEUE.isEmpty() )
441 {
442 EVENT_QUEUE.wait();
443 }
444 Object object = EVENT_QUEUE.remove( 0 );
445 try
446 {
447 event = (EventObject) object;
448 }
449 catch( ClassCastException cce )
450 {
451 final String error =
452 "Unexpected class cast exception while processing an event."
453 + "\nEvent: " + object;
454 throw new IllegalStateException( error );
455 }
456 }
457 catch( InterruptedException e )
458 {
459 return;
460 }
461 }
462
463 Object source = event.getSource();
464 if( source instanceof UnicastEventSource )
465 {
466 UnicastEventSource producer = (UnicastEventSource) source;
467 try
468 {
469 producer.processEvent( event );
470 }
471 catch( Throwable e )
472 {
473 final String error =
474 "Unexpected error while processing event."
475 + "\nEvent: " + event
476 + "\nSource: " + source;
477 m_logger.warn( error, e );
478 }
479 }
480 else
481 {
482 final String error =
483 "Event source ["
484 + source.getClass().getName()
485 + "] is not an instance of " + UnicastEventSource.class.getName();
486 throw new IllegalStateException( error );
487 }
488 }
489
490 m_logger.info( "Controller event queue terminating." );
491 }
492 }
493
494 private static EventDispatchThread m_DISPATCH = null;
495
496 /**
497 * This method starts the event dispatch thread the first time it
498 * is called. The event dispatch thread will be started only
499 * if someone registers a listener.
500 */
501 private synchronized void startEventDispatchThread()
502 {
503 if( m_DISPATCH == null )
504 {
505 Logger logger = getLogger();
506 m_DISPATCH = new EventDispatchThread( logger );
507 m_DISPATCH.setDaemon( true );
508 m_DISPATCH.start();
509 }
510 }
511
512 /**
513 * Create a shutdown hook that will trigger shutdown of the supplied plugin.
514 * @param station the station
515 */
516 public static void setShutdownHook( final RemoteStation station )
517 {
518 //
519 // Create a shutdown hook to trigger clean disposal of the
520 // controller
521 //
522
523 Runtime.getRuntime().addShutdownHook(
524 new Thread()
525 {
526 public void run()
527 {
528 try
529 {
530 station.shutdown();
531 }
532 catch( Throwable e )
533 {
534 System.err.println( e.toString() );
535 }
536 System.runFinalization();
537 }
538 }
539 );
540 }
541
542 }